/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include <sstream>
#include <list>
#include "rpc/RpcAuth.h"
#include "common/XmlConfig.h"
#include "common/SessionConfig.h"

#include "ApplicationClient.h"
#include "ApplicationMaster.h"
#include "ContainerManagement.h"

#include "LibYarnClient.h"
#include "LibYarnConstants.h"

#include "common/Logger.h"

using namespace Yarn::Internal;

namespace libyarn {

LibYarnClient::LibYarnClient(string &user, string &rmHost, string &rmPort,
        string &schedHost, string &schedPort, string &amHost, int32_t amPort,
        string &am_tracking_url, int heartbeatInterval) :
        amUser(user), schedHost(schedHost), schedPort(schedPort), amHost(
        amHost), amPort(amPort), am_tracking_url(am_tracking_url), heartbeatInterval(
        heartbeatInterval), response_id(0), clientJobId(""), keepRun(
        false), needHeartbeatAlive(false) {
    pthread_mutex_init(&(heartbeatLock), NULL);
    amrmClient = NULL;
    appClient = (void*) new ApplicationClient(user, rmHost, rmPort);
    nmClient = (void*) new ContainerManagement();
}
#ifdef MOCKTEST
LibYarnClient::LibYarnClient(string &user,string &rmHost, string &rmPort, string &schedHost,
        string &schedPort, string &amHost, int32_t amPort,
        string &am_tracking_url, int heartbeatInterval,Mock::TestLibYarnClientStub *stub):
        amUser(user),schedHost(schedHost), schedPort(schedPort), amHost(amHost),
        amPort(amPort), am_tracking_url(am_tracking_url),
        heartbeatInterval(heartbeatInterval),clientJobId(""),
        keepRun(false), needHeartbeatAlive(false) {
    pthread_mutex_init( &(heartbeatLock), NULL );
    libyarnStub = stub;
    appClient = (void*) libyarnStub->getApplicationClient();
    amrmClient = (void*) libyarnStub->getApplicationMaster();
    nmClient = (void*) libyarnStub->getContainerManagement();
}
#endif

LibYarnClient::~LibYarnClient() {
#ifndef MOCKTEST
    if (keepRun) {
        // No need to run heart-beat thread now.
        keepRun = false;
        void *thrc = NULL;
        int rc = pthread_join(heartbeatThread, &thrc);
        if (rc != 0) {
            LOG(DEBUG1, "LibYarnClient::~LibYarnClient, fail to join heart-beat thread. "
                        "error code %d", rc);
        } else {
            LOG(DEBUG1, "LibYarnClient::~LibYarnClient, join heart-beat thread successfully.");
        }
    }
#endif
    if (amrmClient != NULL) {
        delete (ApplicationMaster*) amrmClient;
    }
    delete (ApplicationClient*) appClient;
    delete (ContainerManagement*) nmClient;
}

string LibYarnClient::getErrorMessage() {
    return errorMessage;
}

void LibYarnClient::setErrorMessage(string errorMsg) {
    errorMessage = errorMsg;
}

bool LibYarnClient::isJobHealthy() {
    return keepRun;
}

list<ResourceRequest>& LibYarnClient::getAskRequests() {
    return askRequests;
}

void LibYarnClient::clearAskRequests() {
    LOG(DEBUG1, "LibYarnClient::clear ask requests.");
    askRequests.clear();
}

void* heartbeatFunc(void* args) {
    int failcounter = 0;
    int retry = 2;
    LibYarnClient *client = (LibYarnClient*) args;

    while (client->keepRun) {
        try {
            client->dummyAllocate();
            failcounter = 0;
        } catch (const ApplicationMasterNotRegisteredException &e) {
            /*
             * In case catch this exception,
             * heartbeat thread should exits, and re-register AM.
             */
            LOG(WARNING, "LibYarnClient::heartbeat dummy allocation "
                         "catch ApplicationMasterNotRegisteredException. %s",
                         e.msg());
            client->keepRun = false;
            break;
        } catch (const YarnException &e) {
            LOG(WARNING, "LibYarnClient::heartbeat dummy allocation "
                         "is not correctly executed with exception raised. %s",
                         e.msg());
            failcounter++;
            if (failcounter > retry) {
                /* In case retry too many times with errors/exceptions, this
                 * thread will return. LibYarn has to re-register application
                 * and start the heartbeat thread again.
                 */
                LOG(WARNING, "LibYarnClient::heartbeatFunc, there are too many "
                             "failures raised. This heart-beat thread exits now.");
                client->keepRun = false;
                break;
            }
        }
        usleep((client->heartbeatInterval) * 1000);
    }

    LOG(INFO, "LibYarnClient::heartbeatFunc, goes into exit phase.");
    return (void *) 0;
}

int LibYarnClient::createJob(string &jobName, string &queue, string &jobId) {
    try {
        //Only one jobId for the client right now.
        if (clientJobId != ""){
            throw std::invalid_argument( "Exist an application for the client");
        }
        ApplicationClient *applicationClient = (ApplicationClient*) appClient;

        //1. getNewApplication
        ApplicationId appId = applicationClient->getNewApplication();
        LOG(DEBUG1, "LibYarnClient::createJob, getNewApplication finished, appId:[clusterTimeStamp:%lld,id:%d]",
                    appId.getClusterTimestamp(), appId.getId());

        //2. submitApplication
        ApplicationSubmissionContext appSubmitCtx;
        appSubmitCtx.setApplicationId(appId);
        appSubmitCtx.setApplicationName(jobName);
        appSubmitCtx.setQueue(queue);

        Priority priority(0);
        appSubmitCtx.setPriority(priority);

        ContainerLaunchContext amContainerSpec;
        appSubmitCtx.setAMContainerSpec(amContainerSpec);

        appSubmitCtx.setUnmanagedAM(true);
        appSubmitCtx.setMaxAppAttempts(1);

        applicationClient->submitApplication(appSubmitCtx);
        LOG(DEBUG1, "LibYarnClient::createJob, submitApplication finished");

        //3. wait util AM is ACCEPTED and return the AMRMToken
        ApplicationReport report;
        int retry = 10;
        while (retry > 0) {
            report = applicationClient->getApplicationReport(appId);
            LOG(DEBUG1, "LibYarnClient::createJob, appId[cluster_timestamp:%lld,id:%d], appState:%d",
                        appId.getClusterTimestamp(), appId.getId(), report.getYarnApplicationState());
            if ((report.getAMRMToken().getPassword() != "") &&
                    report.getYarnApplicationState() == YarnApplicationState::ACCEPTED) {
                break;
            } else {
                retry--;
                usleep(1000000L);
            }
            if (retry == 0)
                THROW(AccessControlException, "can not register application to YARN");
        }

        clientAppId = appId;
        clientAppAttempId = report.getCurrentAppAttemptId();

        //4.1 new ApplicationMaster
        Token token = report.getAMRMToken();
        UserInfo user;
        if (applicationClient->getMethod() == SIMPLE) {
            user = UserInfo::LocalUser();
        } else if (applicationClient->getMethod() == KERBEROS) {
            user.setEffectiveUser(applicationClient->getPrincipal());
            user.setRealUser(applicationClient->getUser());
        } else {
            LOG(WARNING, "LibYarnClient::createJob: unsupported RPC method:%d. ",
                         applicationClient->getMethod());
        }

        Yarn::Token AMToken;
        AMToken.setIdentifier(token.getIdentifier());
        AMToken.setKind(token.getKind());
        AMToken.setPassword(token.getPassword());
        AMToken.setService(token.getService());

        user.addToken(AMToken);
#ifndef MOCKTEST
        amrmClient = (void*) new ApplicationMaster(this->schedHost, this->schedPort,
                user, token.getService());
#endif

        //4.2 register to RM scheduler as AM
        ((ApplicationMaster*) amrmClient)->registerApplicationMaster(amHost, amPort, am_tracking_url);
        LOG(DEBUG1, "LibYarnClient::createJob, registerApplicationMaster finished");

#ifndef MOCKTEST
        keepRun = true;
        //5. setup the heartbeat thread to allocate, release, heartbeat
        int rc = pthread_create(&heartbeatThread, NULL, heartbeatFunc, this);
        if (rc != 0) {
            keepRun = false;
            LOG(WARNING, "LibYarnClient::createJob, fail to create heart-beat thread. "
                         "error code %d", rc);
            throw std::runtime_error("Fail to create heart-beat thread.");
        }
        needHeartbeatAlive = true;
#endif

        LOG(DEBUG1, "LibYarnClient::createJob, after AM register to RM, a heartbeat thread has been started");
        //6. return jobId
        std::stringstream ss;
        ss << "job_" << appId.getClusterTimestamp() << "_" << appId.getId();
        jobId = ss.str();

        LOG(INFO, "LibYarnClient::createJob, appId[cluster_timestamp:%lld,id:%d]",
                  clientAppId.getClusterTimestamp(), clientAppId.getId());
        clientJobId = jobId;
        return FR_SUCCEEDED;
    } catch (const YarnNetworkConnectException &e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::createJob, catch network connection exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (const std::exception &e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::createJob, catch exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::createJob, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}

int LibYarnClient::forceKillJob(string &jobId) {

#ifndef MOCKTEST
    if (keepRun) {
        keepRun = false;
        void *thrc = NULL;
        int rc = pthread_join(heartbeatThread, &thrc);
        if (rc != 0) {
            LOG(WARNING, "LibYarnClient::forceKillJob, fail to join heart-beat thread. "
                         "error code %d", rc);
            return FR_FAILED;
        } else {
            LOG(INFO, "LibYarnClient::forceKillJob, join heart-beat thread successfully.");
        }
    }
#endif

    try{
        if (jobId != clientJobId) {
            throw std::invalid_argument("The jobId is wrong, please check the jobId argument");
        }

        needHeartbeatAlive = false;
        for (map<int64_t, Container*>::iterator it = jobIdContainers.begin();
                it != jobIdContainers.end(); it++) {
            std::ostringstream key;
            Container *container = it->second;
            key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort();
            Token nmToken = nmTokenCache[key.str()];
            ((ContainerManagement*) nmClient)->stopContainer((*container), nmToken);
            LOG(DEBUG1, "LibYarnClient::forceKillJob, container:%ld is stopped",container->getId().getId());
        }

        ((ApplicationClient*) appClient)->forceKillApplication(clientAppId);
        LOG(INFO, "LibYarnClient::force to kill this application.");

        for (map<int64_t,Container*>::iterator it = jobIdContainers.begin();
                it != jobIdContainers.end(); it++) {
            LOG(DEBUG1, "LibYarnClient::forceKillJob, container:%ld in jobIdContainers is deleted",
                      it->second->getId().getId());
            delete it->second;
            it->second = NULL;
        }
        jobIdContainers.clear();
        activeFailContainerIds.clear();
        return FR_SUCCEEDED;
    } catch (std::exception& e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::forceKillJob, catch the exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::forceKillJob, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}


void LibYarnClient::dummyAllocate() {

    pthread_mutex_lock(&heartbeatLock);
    ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;

    //1) requestProto_blank
    list<ResourceRequest> asksBlank;
    //2) releasesBlank
    list<ContainerId> releasesBlank;
    //3) blacklistRequestBlank
    ResourceBlacklistRequest blacklistRequestBlank;
    //4) progress
    float progress = 0.5;
    int allocatedNum = 0;

    try {
        LOG(DEBUG1, "LibYarnClient::dummyAllocate, do a AM-RM heartbeat with response_id:%d", response_id);
        AllocateResponse response = amrmClientAlias->allocate(asksBlank,
                releasesBlank, blacklistRequestBlank, response_id, progress);
        response_id = response.getResponseId();
        list<Container> allocatedContainers = response.getAllocatedContainers();
        allocatedNum = allocatedContainers.size();
        LOG(DEBUG1, "LibYarnClient::dummyAllocate returned response_id :%d", response_id);
        if (allocatedNum > 0) {
            /*
             * In rare case, client gets allocated containers in heartbeat,
             * free them immediately.
             */
            LOG(DEBUG1, "LibYarnClient::dummyAllocate returned allocated size: %d, "
                        "free them immediately.", allocatedNum);
            list<ContainerId> releases;
            for (list<Container>::iterator it = allocatedContainers.begin();
                 it != allocatedContainers.end(); it++) {
                releases.push_back((*it).getId());
            }
            list<ResourceRequest> asksBlank;
            ResourceBlacklistRequest blacklistRequestBlank;
            response = amrmClientAlias->allocate(asksBlank, releases,
                    blacklistRequestBlank, response_id, progress);
            response_id = response.getResponseId();
        }
        pthread_mutex_unlock(&heartbeatLock);
    }
    catch (const YarnException &e) {
        LOG(WARNING, "LibYarnClient::dummyAllocate, dummy allocation "
                     "is not correctly executed with exception raised. %s",
                     e.msg());
        pthread_mutex_unlock(&heartbeatLock);
        throw;
    }
}

void LibYarnClient::addResourceRequest(Resource capability,
        int32_t num_containers, string host, int32_t priority,
        bool relax_locality)
{
    ResourceRequest *req = new ResourceRequest();
    req->setCapability(capability);
    req->setNumContainers(num_containers);
    Priority priorityProto;
    priorityProto.setPriority(priority);
    req->setPriority(priorityProto);
    req->setResourceName(host);
    req->setRelaxLocality(relax_locality);
    try {
        askRequests.push_back(*req);
        LOG(DEBUG1, "LibYarnClient::put a request into ask list, "
                    "mem:%d, cpu:%d, priority:%d, resource name:%s, relax:%d, num_containers:%d",
                    capability.getMemory(), capability.getVirtualCores(), priority, host.c_str(),
                    relax_locality, num_containers);
    } catch (std::exception &e) {
        LOG(WARNING, "LibYarnClient::Fail to add a resource request "
                     "to ask list. %s ", e.what());
    }
}

/*
 * This function creates resource requests according to the requirement of the caller, then put these
 * requests into a list. The logic is a little similar to java client AMRMClient:addContainerRequest().
 * There are three level nodes in YARN: off-switch(aka ANY), rack, host.By now, only ANY-level and host-level
 * is supported.
 *
 * Parameters:
 *      jobId: jobId
 *      capability: the quota of the resource
 *      count: the required number of the containers
 *      preferred: node list, NULL means ANY host. If one node's rack name is NULL, a default rack name is set.
 *      priority: priority
 */
int LibYarnClient::addContainerRequests(string &jobId, Resource &capability,
        int32_t num_containers, list<LibYarnNodeInfo> &preferred,
        int32_t priority, bool relax_locality)
{
    try {
        if (jobId != clientJobId) {
            throw std::invalid_argument("The jobId is wrong, check the jobId argument");
        }

        map<string, int32_t> inferredRacks;

        for (list<LibYarnNodeInfo>::iterator iter = preferred.begin();
                iter != preferred.end(); iter++) {
            LOG(DEBUG1, "LibYarnClient::addContainerRequests, "
                        "get a preferred host info, host:%s,rack:%s,container number:%d",
                        iter->getHost().c_str(), iter->getRack().c_str(), iter->getContainerNum());
            /* add a resource request for this node */
            addResourceRequest(capability, iter->getContainerNum(), iter->getHost(), priority, true);
            map<string, int32_t>::iterator it = inferredRacks.find(iter->getRack());
            if (it != inferredRacks.end())
                it->second += iter->getContainerNum();
            else
                inferredRacks.insert(make_pair(iter->getRack(), iter->getContainerNum()));
        }

        /* add resource requests for racks*/
        for (map<string, int32_t>::iterator it = inferredRacks.begin();
             it != inferredRacks.end(); it++) {
            addResourceRequest(capability, it->second, it->first, priority, relax_locality);
        }

        /* add resource request for off-switch */
        addResourceRequest(capability, num_containers, YARN_HOST_ANY, priority, relax_locality);

        return FR_SUCCEEDED;
    } catch (std::exception &e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::addContainerRequests catch std exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::addContainerRequests catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}

/*
 message AllocateRequestProto {
 repeated ResourceRequestProto ask = 1;
 repeated ContainerIdProto release = 2;
 optional ResourceBlacklistRequestProto blacklist_request = 3;
 optional int32 response_id = 4;
 optional float progress = 5;
 }

 message AllocateResponseProto {
 optional AMCommandProto a_m_command = 1;
 optional int32 response_id = 2;
 repeated ContainerProto allocated_containers = 3;
 repeated ContainerStatusProto completed_container_statuses = 4;
 optional ResourceProto limit = 5;
 repeated NodeReportProto updated_nodes = 6;
 optional int32 num_cluster_nodes = 7;
 optional PreemptionMessageProto preempt = 8;
 repeated NMTokenProto nm_tokens = 9;
 }
 */
int LibYarnClient::allocateResources(string &jobId,
        list<string> &blackListAdditions, list<string> &blackListRemovals,
        list<Container> &allocatedContainers, int32_t num_containers)
{
    try{
        AllocateResponse response;
        int retry = 5;
        int allocatedNumOnce = 0;
        int allocatedNumTotal = 0;

        pthread_mutex_lock(&heartbeatLock);
        if (jobId != clientJobId) {
            throw std::invalid_argument("The jobId is wrong, check the jobId argument");
        }

        if (!keepRun && needHeartbeatAlive) {
            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
        }

        ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
        list<Container> allocatedContainerCache;
        list<ContainerReport> preContainerReports;
        preContainerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId);

        list<ContainerId> releasesBlank;
        ResourceBlacklistRequest blacklistRequest;
        blacklistRequest.setBlacklistAdditions(blackListAdditions);
        blacklistRequest.setBlacklistRemovals(blackListRemovals);
        float progress = 0.5;

        LOG(DEBUG1, "LibYarnClient::request to allocate resource, container number:%d",
                  num_containers);

        while (retry > 0) {
            LOG(DEBUG1, "LibYarnClient::allocate with response id : %d", response_id);
            AllocateResponse response = amrmClientAlias->allocate(
                    this->askRequests, releasesBlank, blacklistRequest,
                    response_id, progress);
            response_id = response.getResponseId();
            LOG(DEBUG1, "LibYarnClient::allocate returned response id : %d", response_id);
            list<NMToken> nmTokens = response.getNMTokens();
            for (list<NMToken>::iterator it = nmTokens.begin(); it != nmTokens.end(); it++) {
                std::ostringstream oss;
                oss << (*it).getNodeId().getHost() << ":" << (*it).getNodeId().getPort();
                nmTokenCache[oss.str()] = (*it).getToken();
            }
            this->clearAskRequests();
            list<Container> allocatedContainerOnce = response.getAllocatedContainers();
            allocatedNumOnce = allocatedContainerOnce.size();
            if (allocatedNumOnce <= 0) {
                LOG(DEBUG1, "LibYarnClient:: fail to allocate from YARN RM, try again");
                retry--;
                if(retry == 0 && allocatedNumTotal == 0) {
                    /* If failed, just return to Resource Broker to handle*/
                    pthread_mutex_unlock(&heartbeatLock);
                    LOG(WARNING,"LibYarnClient:: fail to allocate from YARN RM after retry several times");
                    return FR_SUCCEEDED;
                }
            } else {
                allocatedNumTotal += allocatedNumOnce;
                allocatedContainerCache.insert(allocatedContainerCache.end(), allocatedContainerOnce.begin(), allocatedContainerOnce.end());
                LOG(DEBUG1, "LibYarnClient:: allocate %d containers from YARN RM", allocatedNumOnce);
                if (allocatedNumTotal >= num_containers) {
                    LOG(DEBUG1, "LibYarnClient:: allocate enough containers from YARN RM, "
                                "expected:%d, total:%d", num_containers, allocatedNumTotal);
                    break;
                }

            }
            usleep(TimeInterval::ALLOCATE_INTERVAL_MS);
        }

        LOG(INFO,"LibYarnClient::allocate resource, response_id:%d, allocated container number:%d",
                 response_id, allocatedNumTotal);

        /* a workaround for allocate more container than request */
        list<ContainerId> releases;
        list<ContainerReport> afterContainerReports;
        afterContainerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId);
        for (list<ContainerReport>::iterator ait = afterContainerReports.begin();
                ait != afterContainerReports.end(); ait++) {
            bool foundInPre = false;
            for (list<ContainerReport>::iterator pit =
                    preContainerReports.begin();
                    pit != preContainerReports.end(); pit++) {
                if (pit->getId().getId() == ait->getId().getId()) {
                    foundInPre = true;
                    break;
                }
            }
            if (!foundInPre) {
                bool foundInNewAllocated = false;
                for (list<Container>::iterator cit =
                        allocatedContainerCache.begin();
                        cit != allocatedContainerCache.end(); cit++) {
                    if (cit->getId().getId() == ait->getId().getId()) {
                        foundInNewAllocated = true;
                        break;
                    }
                }
                if (!foundInNewAllocated) {
                    releases.push_back((*ait).getId());
                }
            }
        }

        int totalNeedRelease = allocatedContainerCache.size() - num_containers;
        LOG(DEBUG1, "LibYarnClient::allocateResources, total_allocated_containers:%ld, total_need_release:%d",
                    allocatedContainerCache.size(), totalNeedRelease);
        if(totalNeedRelease > 0) {
            for (int i = 0; i < totalNeedRelease; i++) {
                list<Container>::iterator it = allocatedContainerCache.begin();
                releases.push_back((*it).getId());
                allocatedContainerCache.erase(it);
            }

            list<ResourceRequest> asksBlank;
            ResourceBlacklistRequest blacklistRequestBlank;
            response = amrmClientAlias->allocate(asksBlank, releases,
                    blacklistRequestBlank, response_id, progress);
            response_id = response.getResponseId();
        }

        /* 3. store allocated containers */
        for (list<Container>::iterator it = allocatedContainerCache.begin();
                it != allocatedContainerCache.end(); it++) {
            Container *container = new Container((*it));
            int64_t containerId = container->getId().getId();
            jobIdContainers[containerId] = container;
        }
        allocatedContainers = allocatedContainerCache;

        LOG(DEBUG1, "LibYarnClient::allocateResources, put all allocated containers size:%ld",
                    allocatedContainerCache.size());

        pthread_mutex_unlock(&heartbeatLock);

        return FR_SUCCEEDED;
    } catch(std::exception &e) {
        std::stringstream errorMsg;

        errorMsg << "LibYarnClient::allocateResources, catch exception:" << e.what();
        setErrorMessage(errorMsg.str());

        pthread_mutex_unlock(&heartbeatLock);
        return FR_FAILED;
    } catch (const ApplicationMasterNotRegisteredException &e) {
        std::stringstream errorMsg;

        errorMsg << "LibYarnClient::allocateResources, "
                    "catch ApplicationMasterNotRegisteredException." << e.what();
        setErrorMessage(errorMsg.str());

        pthread_mutex_unlock(&heartbeatLock);
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;

        errorMsg << "LibYarnClient::allocateResources, catch unexpected exception.";
        setErrorMessage(errorMsg.str());

        pthread_mutex_unlock(&heartbeatLock);
        return FR_FAILED;
    }
}

int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[], int releaseContainerSize)
{
    try{
        pthread_mutex_lock(&heartbeatLock);
        if (jobId != clientJobId) {
            throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
        }

        if (!keepRun && needHeartbeatAlive) {
            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
        }

        ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
        //1) asksBlank
        list<ResourceRequest> asksBlank;
        //2) releases
        list<ContainerId> releases;
        for (int i = 0; i < releaseContainerSize; i++) {
            int64_t containerId = releaseContainerIds[i];
            map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId);
            if (it != jobIdContainers.end()) {
                releases.push_back((it->second)->getId());
            }
        }
        //3) blacklistRequestBlank
        ResourceBlacklistRequest blacklistRequestBlank;
        //4) progress
        float progress = 0.5;

        AllocateResponse response = amrmClientAlias->allocate(asksBlank,
                releases, blacklistRequestBlank, response_id, progress);
        response_id = response.getResponseId();
        //erase from the map jobIdContainers
        for (list<ContainerId>::iterator it = releases.begin(); it != releases.end(); it++) {
            LOG(DEBUG1, "LibYarnClient::releaseResource, released ContainerId:%ld",
                      it->getId());
            map<int64_t, Container*>::iterator cit = jobIdContainers.find(it->getId());
            if (cit != jobIdContainers.end()) {
                delete cit->second;
                cit->second = NULL;
                jobIdContainers.erase(it->getId());
            }
            //erase the element if in activeFailContainers
            set<int64_t>::iterator sit = activeFailContainerIds.find(it->getId());
            if (sit != activeFailContainerIds.end()) {
                LOG(INFO, "LibYarnClient::releaseResource, remove %ld from  activeFailContainerIds",
                          (*sit));
                activeFailContainerIds.erase(*sit);
            }
        }
        LOG(INFO, "LibYarnClient::release resources, container number:%d",
                  releases.size());
        pthread_mutex_unlock(&heartbeatLock);
        return FR_SUCCEEDED;
    } catch (std::exception &e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::releaseResources, catch exception:" << e.what();
        setErrorMessage(errorMsg.str());
        pthread_mutex_unlock(&heartbeatLock);
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::releaseResources, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        pthread_mutex_unlock(&heartbeatLock);
        return FR_FAILED;
    }
}

/*
 ---------------------
 message StartContainerRequestProto {
   optional ContainerLaunchContextProto container_launch_context = 1;
   optional hadoop.common.TokenProto container_token = 2;
 }

 message StartContainerResponseProto {
   repeated StringBytesMapProto services_meta_data = 1;
 }
 ---------------------
 rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto);
 ---------------------
 message StartContainersRequestProto {
   repeated StartContainerRequestProto start_container_request = 1;
 }

 message StartContainersResponseProto {
   repeated StringBytesMapProto services_meta_data = 1;
   repeated ContainerIdProto succeeded_requests = 2;
   repeated ContainerExceptionMapProto failed_requests = 3;
 }
 */
int LibYarnClient::activeResources(string &jobId,int64_t activeContainerIds[],int activeContainerSize) {
    try{
        if (jobId != clientJobId) {
            throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
        }
        if (!keepRun && needHeartbeatAlive) {
            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
        }

        LOG(DEBUG1, "LibYarnClient::activeResources, activeResources started");

        for (int i = 0; i < activeContainerSize; i++) {
            int64_t containerId = activeContainerIds[i];
            map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId);
            if (it != jobIdContainers.end()) {
                try {
                    Container *container = it->second;
                    std::ostringstream key;
                    key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort();

                    Token nmToken = nmTokenCache[key.str()];
                    string cmd("sleep 10000000000");
                    list<string> cmds;
                    cmds.push_back(cmd);
                    ContainerLaunchContext ctx;
                    ctx.setCommand(cmds);
                    StartContainerRequest request;
                    request.setContainerLaunchCtx(ctx);
                    Token cToken = container->getContainerToken();
                    request.setContainerToken(cToken);
                    LOG(DEBUG1, "LibYarnClient::activeResources active containerId:%ld", containerId);
                    ((ContainerManagement*)nmClient)->startContainer((*container), request, nmToken);
                } catch (std::exception& e) {
                    LOG(WARNING, "LibYarnClient::activeResources, activeResources Failed Id:%ld,exception:%s",
                                 containerId,e.what());
                    activeFailContainerIds.insert(containerId);
                }
            }
        }

        LOG(INFO, "LibYarnClient::active resources, container number:%d",
                  activeContainerSize);
        return FR_SUCCEEDED;
    } catch (std::exception& e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::activeResources, Catch the Exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::activeResources, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}
int LibYarnClient::getActiveFailContainerIds(set<int64_t> &activeFailIds){
    activeFailIds = activeFailContainerIds;
    return FR_SUCCEEDED;
}

int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus) {

#ifndef MOCKTEST
    if (keepRun) {
        // No need to run heart-beat thread now.
        keepRun = false;
        void *thrc = NULL;
        int rc = pthread_join(heartbeatThread, &thrc);
        if (rc != 0) {
            LOG(WARNING, "LibYarnClient::finishJob, fail to join heart-beat thread. "
                         "error code %d", rc);
            return FR_FAILED;
        } else {
            LOG(INFO, "LibYarnClient::finishJob, join heart-beat thread successfully.");
        }
    }
#endif

    try {
        if (jobId != clientJobId) {
            throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
        }
        needHeartbeatAlive = false;

        //1. we should stop all containers related with this job
        for (map<int64_t, Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) {
            std::ostringstream key;
            Container *container = it->second;
            key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort();
            Token nmToken = nmTokenCache[key.str()];
            ((ContainerManagement*) nmClient)->stopContainer((*container), nmToken);
            LOG(DEBUG1, "LibYarnClient::finishJob, container:%ld is stopped",container->getId().getId());
        }
        LOG(DEBUG1, "LibYarnClient::finishJob, all containers for jobId:%s are stopped",jobId.c_str());
        //2. finish AM
        string diagnostics("");
        string tracking_url("");
        ((ApplicationMaster*) amrmClient)->finishApplicationMaster(diagnostics, tracking_url, finalStatus);
        LOG(INFO, "LibYarnClient::finishJob, finish AM for jobId:%s, finalStatus:%d", jobId.c_str(), finalStatus);
        //free the Container* memory
        for (map<int64_t, Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) {
            LOG(DEBUG1, "LibYarnClient::finishJob, container:%ld in jobIdContainers is deleted",
                        it->second->getId().getId());
            delete it->second;
            it->second = NULL;
        }
        jobIdContainers.clear();
        activeFailContainerIds.clear();

        return FR_SUCCEEDED;
    } catch (std::exception& e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::finishJob, catch the Exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (const ApplicationMasterNotRegisteredException &e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::finishJob, "
                    "catch ApplicationMasterNotRegisteredException." << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::finishJob, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}

int LibYarnClient::getApplicationReport(string &jobId,ApplicationReport &applicationReport) {
    try {
        if (jobId != clientJobId) {
            throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
        }

        if (!keepRun && needHeartbeatAlive) {
            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
        }

        LOG(DEBUG1, "LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d]",
                    clientAppId.getClusterTimestamp(), clientAppId.getId());
        applicationReport = ((ApplicationClient*) appClient)->getApplicationReport(clientAppId);
        LOG(DEBUG1, "LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d],getCurrentAppAttemptId:%d",
                    applicationReport.getApplicationId().getClusterTimestamp(),
                    applicationReport.getApplicationId().getId(),
                    applicationReport.getCurrentAppAttemptId().getAttemptId());

        return FR_SUCCEEDED;
    } catch (std::exception& e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getApplicationReport, Catch the Exception:"
                << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getApplicationReport, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}

int LibYarnClient::getContainerReports(string &jobId,list<ContainerReport> &containerReports) {
    try {
        if (jobId != clientJobId) {
            throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
        }

        if (!keepRun && needHeartbeatAlive) {
            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
        }

        LOG(DEBUG1, "LibYarnClient::getContainerReports, appId[cluster_timestamp:%lld,id:%d]",
                    clientAppId.getClusterTimestamp(), clientAppId.getId());
        containerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId);
        return FR_SUCCEEDED;
    } catch (std::exception& e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getContainerReports, catch the Exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getContainerReports, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}

int LibYarnClient::getContainerStatuses(string &jobId, int64_t containerIds[],
        int containerSize, list<ContainerStatus> &containerStatues)
{
    try {
        if (jobId != clientJobId) {
            throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
        }

        if (!keepRun && needHeartbeatAlive) {
            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
        }

        for (int i = 0; i < containerSize; i++) {
            int64_t containerId = containerIds[i];
            map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId);
            if (it != jobIdContainers.end()) {
                try {
                    Container *container = it->second;
                    std::ostringstream key;
                    key << container->getNodeId().getHost() << ":"<< container->getNodeId().getPort();
                    Token nmToken = nmTokenCache[key.str()];
                    ContainerStatus containerStatus = ((ContainerManagement*) nmClient)->getContainerStatus((*container),  nmToken);
                    // the response containerId will be 0 if the request containerId is not exist
                    if (containerStatus.getContainerId().getId() != 0){
                        containerStatues.push_back(containerStatus);
                    }
                } catch (std::exception& e) {
                    LOG(INFO, "LibYarnClient::getContainerStatuses, getContainerStatuses Failed Id:%ld,exception:%s",
                              containerId, e.what());
                }
            }
        }
        return FR_SUCCEEDED;
    } catch (std::exception& e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getContainerStatuses, Catch the Exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getContainerStatuses, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}

int LibYarnClient::getQueueInfo(string &queue, bool includeApps,
        bool includeChildQueues, bool recursive, QueueInfo &queueInfo)
{
    try {
        if (!keepRun && needHeartbeatAlive) {
            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
        }

        queueInfo = ((ApplicationClient*) appClient)->getQueueInfo(queue,
                includeApps, includeChildQueues, recursive);
        return FR_SUCCEEDED;
    } catch (std::exception& e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getQueueInfo, Catch the Exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getQueueInfo, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}

int LibYarnClient::getClusterNodes(list<NodeState> &states,list<NodeReport> &nodeReports) {
    try{
        if (!keepRun && needHeartbeatAlive) {
            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
        }
        nodeReports = ((ApplicationClient*) appClient)->getClusterNodes(states);
        return FR_SUCCEEDED;
    } catch (std::exception& e) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getClusterNodes, Catch the Exception:" << e.what();
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    } catch (...) {
        std::stringstream errorMsg;
        errorMsg << "LibYarnClient::getClusterNodes, catch unexpected exception.";
        setErrorMessage(errorMsg.str());
        return FR_FAILED;
    }
}
}
